package com.yc.bigdata.flink.demo.schema;

import com.alibaba.fastjson.JSON;
import com.yc.bigdata.flink.demo.vo.UserAction;

import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;

/**
 * <p></p>
 *
 * @author: YuanChilde
 * @date: 2020-02-15 16:58
 * @version: 1.0
 * Modification History:
 * Date    Author      Version     Description
 * -----------------------------------------------------------------
 * 2020-02-15 16:58    YuanChilde     1.0        新增
 */
public class UserActionKafkaSchema implements KafkaSerializationSchema<UserAction> {

    private String topic;

    public UserActionKafkaSchema(String topic) {
        this.topic = topic;
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(UserAction userAction, @Nullable Long aLong) {
        return new ProducerRecord(this.topic, JSON.toJSONString(userAction));
    }
}
